Flink双流及多流Join 、IntervalJoin、coGroupJoin的区别与生产使用

您所在的位置:网站首页 flinksql 双流join Flink双流及多流Join 、IntervalJoin、coGroupJoin的区别与生产使用

Flink双流及多流Join 、IntervalJoin、coGroupJoin的区别与生产使用

#Flink双流及多流Join 、IntervalJoin、coGroupJoin的区别与生产使用| 来源: 网络整理| 查看: 265

1.Flink 三种Join的代码测试

文章目录 1.Flink 三种Join的代码测试1.1 数据源1.2 join1.3 intervalJoin1.3.1 intervalJoin API用法1.3.2 intervalJoin SQL用法 1.4 coGroup 2. intervalJoin源码解析2.1 between方法进入类2.2 将上述重要方法1 IntervalJoinOperator单独拿出来解析2.3 状态清理机制详解2.3.1 状态清理时间cleanupTime2.3.2 执行状态清理操作 Buffer.remove(timestamp) 2.4 看完源码后需要知道的2.4.1 MapState存储状态2.4.2 状态清理时间 3.三种Join的区别及使用场景4.多个流Join4.1 场景1:多个流更新不频繁,需要实时join成一张表(多个维表JOIN成一张维表)4.2 两个流Join(事实表与维表JOIN)4.3 两个事实表Join(不使用TimeWindowJoin) 5.拓展5.1 Canal的Phoenix插件4.5 Canal的替代Flink CDC

1.1 数据源

(1)左流

订单表(orders) id productName orderTime 1 iphone 2020-04-01 10:00:00.0 2 mac 2020-04-01 10:02:00.0 3 huawei 2020-04-01 10:03:00.0 4 pad 2020-04-01 10:05:00.0

(2)右流

物流表(shipments) shipId orderId status shiptime 0 1 shipped 2020-04-01 11:00:00.0 1 2 delivered 2020-04-01 17:00:00.0 2 3 shipped 2020-04-01 12:00:00.0 3 4 shipped 2020-04-01 11:30:00.0 1.2 join

(1)代码

//延迟0s val delay = 0 //Window 4hour val window=4 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay)) val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay)) left.join(right) .where(_._1).equalTo(_._2) //Join字段 left流的第一个字段(id) 等于 right流的第二个字段(orderId) .window(TumblingEventTimeWindows.of(Time.hours(window))) //滑动窗口 //IN1 (Int,String,Long) id productName orderTime //IN2 (Int, Int,String,Long) shipId orderId status shiptime //OUT (Int,String,String,Long,Long)) orderId productName status orderTime shiptime .apply(new JoinFunction[(Int,String,Long),(Int, Int,String,Long),(Int,String,String,Long,Long)] { override def join(first: (Int,String,Long), second: (Int, Int,String,Long)):(Int,String,String,Long,Long) = { (first._1, first._2, second._3, first._3, second._4) } }).print() env.execute()

(2)分析与输出结果

WM=窗口内最大的时间-允许延迟执行的时间 VM是不断增大的 窗口触发条件: WM 》=上一个窗口的结束边界 窗口内最大的时间-允许延迟执行的时间 > = 上一个窗口的结束边界 nowTimeStamp nowTime currentMaxT WM 窗口 窗口转化为hour WM转化为hour 订单表(orders) (1,iphone,1585706400000) -- 1585706400000 -- 10:00:00 -- 10:00:00 -- 10:00:00 10:00:00-12:00:00 [10-12) 10:00 (2,mac,1585706520000) -- 1585706520000 -- 10:02:00 -- 10:02:00 -- 10:02:00 10:00:00-12:00:00 [10-12) 10:02 (3,huawei,1585706580000) -- 1585706580000 -- 10:03:00 -- 10:03:00 -- 10:03:00 10:00:00-12:00:00 [10-12) 10:03 (4,pad,1585706700000) -- 1585706700000 -- 10:05:00 -- 10:05:00 -- 10:05:00 10:00:00-12:00:00 [10-12) 10:05 物流表(shipments) (0,1,shipped,1585710000000) -- 1585710000000 -- 11:00:00 -- 11:00:00 -- 11:00:00 10:00:00-12:00:00 [10-12) 11 (1,2,delivered,1585731600000) -- 1585731600000 -- 17:00:00 -- 17:00:00 -- 17:00:00 16:00:00-18:00:00 [16-18) 17 (2,3,shipped,1585713600000) -- 1585713600000 -- 12:00:00 -- 17:00:00 -- 17:00:00 12:00:00-14:00:00 [12-14) 17 (3,4,shipped,1585711800000) -- 1585711800000 -- 11:30:00 -- 17:00:00 -- 17:00:00 10:00:00-12:00:00 [10-12) 17 //窗口的WM为17,大于窗口的结束边界12,Window窗口触发

订单表(orders)的四条数据与物流表(shipments)的(0,1,shipped,1585710000000)、(3,4,shipped,1585711800000) 同窗口,

并在物流表中流(3,4,shipped,1585711800000)输入时,窗口的WM为17(hour),大于窗口的结束边界12(hour),Window窗口触发。

输出结果:

Window 4hour (1,iphone,shipped,1585706400000,1585710000000) (4,pad,shipped,1585706700000,1585711800000) 1.3 intervalJoin

支持INNER JOIN、LEFT JOIN、RIGHT JOIN和FULL JOIN,如果直接使用JOIN,默认为INNER JOIN。

暂不支持SEMI JOIN和ANTI JOIN。

TIMEBOUND_EXPRESSION为左右两个流时间属性列上的区间条件表达式,支持以下三种条件表达式:

ltime = rtime ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND 1.3.1 intervalJoin API用法

(1)代码

//延迟0s val delay = 0 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay)) val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay)) left.print("orderStream=>") right.print("shipMentStream=>") left .keyBy(0) .intervalJoin(right.keyBy(1)) // between 只支持 event time //时间间隔 -> leftStream 默认和 [left+0hour,left+4hour]的时间范围内的rightStream进行Join //订单流和 发送物流流延迟4个小时内的数据可以Join上 .between(Time.hours(0), Time.hours(4)) //不包含下界 //.lowerBoundExclusive() //不包含上界 //.upperBoundExclusive() .process(new ProcessJoinFunction[ (Int, String, Long),(Int,Int,String,Long) , (Int,String,String,Long,Long)]() { override def processElement(orders: (Int, String, Long), shipments:(Int,Int,String,Long), context: ProcessJoinFunction[ (Int, String, Long),(Int,Int,String,Long) , (Int,String,String,Long,Long)]#Context, out: Collector[(Int,String,String,Long,Long)]): Unit = { //orderId,ProductName, orderStatus ,TimeStamp ,TimeStamp out.collect( (orders._1, orders._2, shipments._3,orders._3, shipments._4)) } }) .print("IntervalJoin=>"); env.execute("IntervalJoinTest")

(2)分析与输出结果

时间间隔 -> leftStream 默认和 [left+0hour,left+4hour]的时间范围内的rightStream进行Join

订单流(leftStream)和 发送物流流(rightStream)延迟4个小时内的数据可以Join上

输出结果:

IntervalJoin=>> (1,iphone,shipped,1585706400000,1585710000000) IntervalJoin=>> (3,huawei,shipped,1585706580000,1585713600000) IntervalJoin=>> (4,pad,shipped,1585706700000,1585711800000) 1.3.2 intervalJoin SQL用法 val delay = 0 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay)) .map(ele=>Order(ele._1,ele._2,DateUtils.formatTime(ele._3))) val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay)) .map(ele=>ShipMents(ele._1,ele._2,ele._3,DateUtils.formatTime(ele._4))) val tableEnvironment = StreamTableEnvironment.create(env) val orderTable:Table=tableEnvironment.fromDataStream(left) val shipmentsTable:Table=tableEnvironment.fromDataStream(right) val table: Table = tableEnvironment.sqlQuery( s""" |SELECT o.id, o.productName, s.status |FROM $orderTable AS o |JOIN $shipmentsTable AS s on o.id = s.orderId AND | o.orderTime BETWEEN s.shipTime - INTERVAL '4' HOUR AND s.shipTime |""".stripMargin) tableEnvironment.toAppendStream[(Int,String,String)](table).print("IntervalJoinTest") env.execute()

注意 SQL与API,在写法上有点不一样,但是含义上都表示order流能够Join上shipMent流延迟4个小时之内的数据。

o.orderTime BETWEEN s.shipTime - INTERVAL '4' HOUR AND s.shipTime

orderStream .keyBy(0) .intervalJoin(shipTimeStream.keyBy(1)) .between(Time.hours(0), Time.hours(4)) 1.4 coGroup

(1)代码

//延迟0s val delay = 0 //Window 4hour val window=4 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val leftStream = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay)) val rightStream = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay)) val leftJoinResult: DataStream[(Int,String,String,Long,Long)] = leftStream. coGroup(rightStream).where(_._1).equalTo(_._2) //leftJoin,以名字进行关联 .window(TumblingEventTimeWindows.of(Time.hours(window))) //滚动窗口 //IN1 (Int,String,Long) id productName orderTime //IN2 (Int, Int,String,Long) shipId orderId status shiptime //OUT (Int,String,String,Long,Long)) orderId productName status orderTime shiptime .apply(new CoGroupFunction[(Int,String,Long),(Int, Int,String,Long),(Int,String,String,Long,Long)] { override def coGroup(first: lang.Iterable[(Int,String,Long)], second: lang.Iterable[(Int, Int,String,Long)], out: Collector[(Int,String,String,Long,Long)]): Unit = { for (firstEle


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3